package com.ld.shieldsb.common.core.queue;

import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

import com.ld.shieldsb.common.core.collections.ListUtils;

import lombok.extern.slf4j.Slf4j;

//无需恢复执行队列（每个程序重新启动，不会自动加载上次未执行完数据）
@Slf4j
public abstract class NoRecoverRunningQueue<T> {

    private BlockingQueue<QueueEntry<T>> queue;
    private Map<String, T> modelMap = new LinkedHashMap<>(); // 原先是hashMap 不能够满足日志插入的执行顺序

    private long period = 0;
    private volatile boolean isShutdown = false;
    private volatile boolean isRunning = true;

    private Thread thread = new Thread() {
        private int tryCount = 0;// 记录modelMap为空时处理线程的尝试次数

        @Override
        public void run() {
            while (true) {
                try {
                    queue2Map();

                    if (modelMap.isEmpty()) {
                        if (isShutdown) {
                            log.warn("队列“" + NoRecoverRunningQueue.this.getClass().getSimpleName() + "”执行完毕！");
                            return;
                        } else {
                            tryCount++;
                            if (tryCount == 10) {// 最多尝试10次（尽量让处理线程不wait）
                                synchronized (thread) {// 必须先synchronized
                                    isRunning = false;
                                    log.info("■■处理线程" + NoRecoverRunningQueue.this.getClass().getSimpleName() + "开始wait()...");
                                    wait();
                                    isRunning = true;// 执行notify时wait方法不抛异常，所以标识位在此处进行修改
                                    tryCount = 0;// 继续执行并重置tryCount
                                    log.info("■■处理线程" + NoRecoverRunningQueue.this.getClass().getSimpleName() + "结束wait()，继续执行！");
                                }
                            } else {
                                Thread.sleep(1000);// 休眠1秒
                            }
                            continue;
                        }
                    } else {
                        tryCount = 0;// modelMap不为空则重置tryCount
                    }

                    String[] keys = modelMap.keySet().toArray(new String[0]);
                    for (String key : keys) {
                        T model = modelMap.remove(key);
                        if (model != null) {
                            NoRecoverRunningQueue.this.doing(model);
                            saveModelMap(modelMap);
                        }
                    }
                    if (period != 0) {
                        Thread.sleep(period);
                        log.info("队列" + NoRecoverRunningQueue.this.getClass().getSimpleName() + "按要求暂停一段时间" + period + "毫秒");
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } catch (Exception e) {
                    log.error("队列“" + NoRecoverRunningQueue.this.getClass().getSimpleName() + "”执行发生异常。", e);
                }
            }
        }

        // 将queue放入Map
        private void queue2Map() {
            log.info("处理队列" + NoRecoverRunningQueue.this.getClass().getSimpleName() + "，当前长度为：" + queue.size());
            for (int i = 0, len = queue.size(); i < len; i++) {
                try {
                    QueueEntry<T> queueEntry = queue.take();
                    T newModel = queueEntry.getModel();
                    T model = newModel;
                    T oldModel = modelMap.get(queueEntry.getKey());
                    if (oldModel != null) {
                        model = mergeModel(oldModel, newModel);
                    }
                    String key = queueEntry.getKey();
                    modelMap.put(key, model);
                    saveQuque(queue);
                    saveModelMap(modelMap);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    };

    /**
     * 默认创建一个队列长度为10000，线程数为1的自动运行队列
     */
    protected NoRecoverRunningQueue() {
        this(10000);
    }

    protected NoRecoverRunningQueue(int queueSize) {
        this(queueSize, 0);
    }

    protected NoRecoverRunningQueue(int queueSize, int period) {
        queue = new LinkedBlockingQueue<>(queueSize);
        this.period = period;
        thread.setDaemon(true);
        thread.start();
        NoRecoverRunningQueue.log.info("队列“" + getClass().getSimpleName() + "”创建，执行队列开始运行...");
    }

    protected void init(Map<String, T> modelMap, List<QueueEntry<T>> queue) {
        if (modelMap != null) {
            this.modelMap.putAll(modelMap);
        }
        if (ListUtils.isNotEmpty(queue)) {
            this.queue.addAll(queue);
        }
        synchronized (thread) {// 必须先synchronized
            thread.notify();// 唤醒
        }
    }

    public boolean put(String key, T model) {
        if (isShutdown) {
            log.warn("执行队列" + NoRecoverRunningQueue.this.getClass().getSimpleName() + "已经关闭，无法添加新记录");
            return false;
        }
        try {
            queue.put(new QueueEntry<T>(key, model));
            saveQuque(queue);
            if (!isRunning) {// 如果处理线程没有在执行（执行了wait）
                synchronized (thread) {// 必须先synchronized
                    log.info("◆唤醒执行队列" + NoRecoverRunningQueue.this.getClass().getSimpleName());
                    thread.notify();// 唤醒
                }
            }
            return true;
        } catch (InterruptedException e) {
            log.error("", e);
            Thread.currentThread().interrupt();
            return false;
        }
    }

    // 如果队列中key进行合并操作，默认采用新数据，可重写
    protected T mergeModel(T oldModel, T newModel) {
        return newModel;
    }

    public boolean isEmpty() {
        return queue.isEmpty() && modelMap.isEmpty();
    }

    public int size() {
        return queue.size() + modelMap.size();
    }

    protected void saveQuque(BlockingQueue<QueueEntry<T>> queue) {
    }

    protected void saveModelMap(Map<String, T> modelMap) {
    }

    public void shutdown() {
        isShutdown = true;
        synchronized (thread) {// 必须先synchronized
            thread.notify();// 唤醒
        }
    }

    // 每次执行方法
    protected abstract void doing(T model) throws Exception;

    public Map<String, T> getModelMap() {
        return modelMap;
    }

    protected void clearModelMap() {
        modelMap.clear();
    }
}